package SSF.OS.UDP.test;

/**
 * udpStreamServer.java
 *
 * Created: Sun Jun  6 15:29:06 1999 hongbol
 * Revised Wed Nov 3 1999 ato
 * @version 0.5
 */

import com.renesys.raceway.SSF.*;
import SSF.OS.*;
import SSF.OS.UDP.*;
import SSF.OS.Socket.*;
import com.renesys.raceway.DML.*;
import SSF.Net.*;
import SSF.Net.Util.*;

/** A simple UDP streaming server application provided as an example.
 *  The server configures itself from the DML file that
 *  specifies the server's wellKnownPort, the maximum datagram_size
 *  ( = payload in bytes) and the send_interval (in seconds) between
 *  sending consecutive chunks of virtual data to the client.
 *  <P>The client-server protocol is absolutely minimal: the client
 *  sends to server's well known address just one integer that specifies
 *  the total amount of virtual data it requests. After receiving
 *  a client's request, the server spawns a slave server that periodically
 *  sends the datagrams of size datagram_size to the client until
 *  the request is fulfilled.
 *  <P>The default is no limit on the number of clients, unless the attribute
 *  client_limit is set. <P>There
 *  is no limit on the number of servers that can coexist in a single Host
 *  as long as each udpServer has a distinct wellKnownPort number.
 *  <P>If datagram_size is chosen to be larger than allowed by either the local
 *  UDP protocol (udpinit.send_buffer_size), or by the remote UDP
 *  receive buffer (udpinit.rcv_buffer_size), the datagrams will
 *  be dropped - with some diagnostic output if the debug options
 *  are set to true. <P>The datagram_size and send_interval should be chosen
 *  so that the resulting data bandwidth does not exceed the bitrate
 *  specified on the server's Host link interface, otherwise the interface
 *  will queue the outgoing datagrams and may drop them if configured
 *  with small enough buffer.
 */

public class udpStreamServer extends ProtocolSession {

  /********************* Attribute Variables *********************/

  /** port number used by this server */
  int wellKnownPort;

  /** default maximum server datagram size in bytes */
  int datagram_size = 1000;

  /** time between sending datagrams (in ssf ticks) */
  long send_interval = 0;

  /** size of data request object from matching client */
  int request_size; 

  /** "listening" socket of this server */
  udpSocket udpsocket;

  /** max number of clients of this server - default is unlimited */
  int client_limit = Integer.MAX_VALUE;

  /** current number of contemporaneous clients of this server */
  int clientNumber = 0;

  /** Host entity where this pseudo-protocol is installed */
  Host localHost;

  /** Host's IP address */
  int localIP;

  /** Host's NHI address */
  String localNHI;

  /** Host's Socket ProtocolSession */
  socketMaster socketms;

  /** default is don't print verbose debugging information */
  boolean debug = false;

  /** default is show summary session information */
  boolean show_report = true;

  /*************** The slave server inner class ***************/
 
  class slaveServer extends ProtocolSession {
    udpStreamServer owner;
    int nbytesRemaining;
    int nbytesRequest;
    int dataSize;
    int numDatagrams = 0;
    int numFailures = 0;
    udpSocket sd;
    int srcIP;
    int srcPort;
    int destIP;
    int destPort;

    public slaveServer(int docsize, int local_ip, 
                       int local_port, int remote_ip, int remote_port) {
      owner = udpStreamServer.this;
      nbytesRemaining = docsize;
      nbytesRequest = docsize;
      srcIP = local_ip;
      srcPort = local_port;
      destIP = remote_ip;
      destPort = remote_port;
      dataSize = owner.datagram_size;
    }

    void start() {
      try {
        sd = (udpSocket)owner.socketms.socket(this, "udp");
        sd.bind(srcIP, srcPort);
        sd.connect(destIP, destPort, null);  // creates a UDP session
      } catch (ProtocolException e) {
        System.err.println(e);
      }
      senddata();
    }

    void senddata() {
      if(owner.send_interval > 0){
        (new Timer(owner.inGraph(), SSF.Net.Net.seconds(owner.send_interval)){
          public void callback(){
            if (nbytesRemaining > 0){
              if((nbytesRemaining - dataSize) < 0) dataSize = nbytesRemaining;
              sd.write(dataSize, new Continuation(){
                public void success() {
                  nbytesRemaining -= dataSize;
                  if (debug) {
                    localInfo(" wrote " + dataSize + " bytes OK");
                    ++numDatagrams;
                  }
                }
                public void failure(int errno) {
                  if (debug) {
                    ++numDatagrams;
                    ++numFailures;
                    if(errno == 1)
                      localInfo(" write failed, exceeds max_datagram_size");
                    if(errno == 2)
                      localInfo(" write failed, dropped by IP or NIC");
                  }   
                }
              });
              set(owner.send_interval);
            } else try {
              sd.close(null);
              --owner.clientNumber;
              if(debug)
                sessionInfo("sent " + nbytesRequest + "B"
                             + " #pkts " + numDatagrams
                             + " #failed " + numFailures);
              cancel();
            } catch (ProtocolException e) {
              System.out.println(e);
            }
          }
        }).set(owner.send_interval);
      } else {
        while(nbytesRemaining > 0){
          if((nbytesRemaining - dataSize) < 0) dataSize = nbytesRemaining;
          sd.write(dataSize, new Continuation(){
            public void success() {
              nbytesRemaining -= dataSize;
              if (debug) {
                localInfo(" wrote " + dataSize + " bytes OK");
                ++numDatagrams;
              }
            }
            public void failure(int errno) {
              if (debug) {
                ++numDatagrams;
                ++numFailures;
                if(errno == 1)
                  localInfo(" write failed, exceeds max_datagram_size");
                if(errno == 2)
                  localInfo(" write failed, dropped by IP or NIC");
              }
            }
          });
        }
        try {
          sd.close(null);
          --owner.clientNumber;
          if(debug)
            sessionInfo("sent " + nbytesRequest + "B"
                         + " #pkts" + numDatagrams
                         + " #failed " + numFailures);
        }
        catch (ProtocolException e) { System.out.println(e); }
      }
    }

    /** preamble to slave server diagnostics */
    void localInfo(String str){
      System.out.println(owner.inGraph().now()/(double)SSF.Net.Net.seconds(1.0)
            + " udpServer " + IP_s.IPtoString(srcIP) 
            + ":" + srcPort + " " + str);
    }
  
    /** preamble to end2end UDP session diagnostics */
    void sessionInfo(String str){
      System.out.println(owner.inGraph().now()/(double)SSF.Net.Net.seconds(1.0)
            + " udpServer " + IP_s.IPtoString(srcIP) 
            + ":" + srcPort
            + " clnt " + IP_s.IPtoString(destIP)
            + ":" + destPort + " " + str);
    }

    public boolean push(ProtocolMessage message, ProtocolSession fromSession)
      throws ProtocolException {
      return false;
    }
  } // end of slaveServer

  /************************* Constructors ************************/

  public udpStreamServer() {}

  /************************ Class Methods ***********************/

  /** Server configuration. Supported DML attributes:
   * <PRE>
   *  ProtocolSession [ name server use SSF.OS.UDP.test.udpStreamServer
   *    port          %I      # server's well known port number
   *    client_limit  %I      # max number of simultaneous clients,
   *                          # if omitted, no limit
   *
   *    request_size  %I      # client request nominal datagram size (bytes, int)
   *    datagram_size %I      # max server datagram payload size (virtual bytes, int)
   *    send_interval %F      # time interval between consecutive server datagrams, sec
   *                          # if omitted, datagrams are sent back to back
   *
   *    show_report   %S      # print client-server session summary report, true/false
   *    debug         %S      # print verbose client/server diagnostics, true/false
   *  ]
   * </PRE>
   */
  public void config(Configuration cfg) throws configException {
    super.config(cfg);
    String str;

    if((str = (String)cfg.findSingle("port")) != null) {
      wellKnownPort = (new Integer(str)).intValue();
      if (wellKnownPort > 10000)
        serverInfo("Warning: udpServer port must be < 10,000.");
    }

    if((str = (String)cfg.findSingle("request_size")) != null)
      request_size = (new Integer(str)).intValue();

    if((str = (String)cfg.findSingle("datagram_size")) != null)
      datagram_size = (new Integer(str)).intValue();

    if((str = (String)cfg.findSingle("client_limit")) != null)
      client_limit = (new Integer(str)).intValue();

    str = (String)cfg.findSingle("debug");
    if((str != null) && (str.compareTo("true") == 0))
      debug =  true;

    str = (String)cfg.findSingle("show_report");
    if((str != null) && (str.compareTo("false") == 0))
      show_report = false;

    if((str = (String)cfg.findSingle("send_interval")) != null) {
      //double interval = (new Double(str)).doubleValue();
      send_interval = SSF.Net.Net.seconds((new Double(str)).doubleValue());
    }
  }

  Object[] obj;


  public void init(){
    localHost = (SSF.Net.Host)inGraph();
    localNHI = localHost.nhi;
    try {
      socketms = (socketMaster)localHost.SessionForName("socket");
      udpsocket = (udpSocket)socketms.socket(this, "udp");
      localIP = ((NIC)((IP)localHost.SessionForName("ip"))
                   .INTERFACES.elementAt(0)).ipAddr;
    }catch (ProtocolException e) {
      System.err.println("udpServer: " + e);
    } 
    udpsocket.bind(-1, wellKnownPort);
    udpsocket.listen(client_limit);    // creates a UDP session
    obj = new Object[1];
    serv();
  }

  public void serv() {
    udpsocket.read(obj, request_size, new Continuation(){
      public void success() {
        if((obj == null) || (obj[0] == null)) {
          serverInfo("received unknown request");
        } else {
          ++clientNumber;
          if(clientNumber > client_limit) {
            if(debug)
              serverInfo("max # clients exceeded, refused");
            --clientNumber;
          } else {
            int doc_size = ((Integer)obj[0]).intValue();
            if(debug)
              serverInfo(" request " + doc_size + "B");

            // a slave server reuses the master server's port number,
            // and uses the local IP, remote port and remote IP that were
            // contained in the service request UDP header
            slaveServer srv = new slaveServer(doc_size,
                                    udpsocket.last_local_ip,
                                    wellKnownPort,
                                    udpsocket.last_remote_ip,
                                    udpsocket.last_remote_port );
            srv.start();
          }
        }
        serv();
      }
      public void failure(int errno) {
        if(debug)
          serverInfo("failure reading clnt request");
        serv();
      }
    }); // end udpsocket.read
  }  

  /** preamble to server diagnostics */
  void serverInfo(String str){
    System.out.println(localHost.now()/(double)SSF.Net.Net.seconds(1.0)
            + " udpServer " + IP_s.IPtoString(localIP) 
            + ":" + udpsocket.local_port + " " + str);
  }

  public boolean push(ProtocolMessage message, ProtocolSession fromSession)
    throws ProtocolException {
    return false;
  }
}
